dxapi
InstrumentMessage
class InstrumentMessage(object)
setTimestamp
def setTimestamp(timestamp: int)
Set timestamp for message.
Arguments:
timestamp- int, epoch time in millis.
setDateTime
def setDateTime(timestamp: datetime)
Set timestamp for message from datetime.
Arguments:
timestamp- datetime object.
getDateTime
def getDateTime()
Returns timestamp of message as datetime object.
setNanoTime
def setNanoTime(timestamp: int)
Set timestamp for message.
Arguments:
timestamp- int, epoch time in nanos.
getNanoTime
def getNanoTime()
Returns timestamp of message (epoch time in nanos).
to_dict
def to_dict(attributes: list[str] = None) -> dict
Convert object attributes to a dictionary.
Arguments:
attributeslist[str], optional - List of attribute names to include in the dictionary. If None, all public attributes are included.
InstrumentType
class InstrumentType(object)
Represents instrument type.
Object of this class can be created from instrument type name (str), or int value:
Example:
```
fxType = dxapi.InstrumentType('FX')
indexType = dxapi.InstrumentType(dxapi.InstrumentType.INDEX)
```Possible values:
```
EQUITY
OPTION
FUTURE
BOND
FX
INDEX
ETF
CUSTOM
SIMPLE_OPTION
EXCHANGE
TRADING_SESSION
STREAM
DATA_CONNECTOR
EXCHANGE_TRADED_SYNTHETIC
SYSTEM
CFD
UNKNOWN
```
InstrumentIdentity
class InstrumentIdentity(object)
Represents instrument, contains from instrument type and instrument symbol.
Example:
```
appleId = dxapi.InstrumentIdentity(dxapi.InstrumentType('EQUITY'), 'AAPL')
btcUsdId = dxapi.InstrumentIdentity(dxapi.InstrumentType.FX, 'BTC/USD')
```
StreamScope
class StreamScope(object)
Determines the scope of a stream's durability, if any.
Example:
```
scope = dxapi.StreamScope('TRANSIENT')
```Possible values:
```
DURABLE,
EXTERNAL_FILE,
TRANSIENT,
RUNTIME
```
WriteMode
class WriteMode(object)
APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time. INSERT: New data inserts into a stream without truncations.
Example:
```
mode = dxapi.WriteMode('TRUNCATE')
```Possible values:
```
APPEND,
REPLACE,
REWRITE,
TRUNCATE,
INSERT
```
DecimalType
class DecimalType(enum.Enum)
Specifies output type for decimal fields. Possible values:
```
DOUBLE,
DECIMAL
```
SelectionOptions
class SelectionOptions(object)
Options for selecting data from a stream.
Example:
```
so = dxapi.SelectionOptions()
so._from = 0
so.to = 100000
so.useCompression = False
so.live = True
so.reverse = False
so.allowLateOutOfOrder = True
so.realTimeNotification = True
so.minLatency = False
```Properties:
_fromint - Start timestamp in millis.
toint - End timestamp in millis.
useCompressionbool - Use compression.
livebool - Instead of returning false from next () at the end of the stream, wait for live data to be added.
reversebool - Specify cursor direction.
allowLateOutOfOrderbool - Output out-of-order late messages. Timebase consumers receive historical messages they requested strictly ordered by their time. For scenarios when new messages arrive in the middle of consumer's session (So called 'live' mode) it is possible that newly arrived message has a timestamp in already consumer time region. In this cases this flag allows consumer to receive these 'late' messages even if they out of order with respect to current baseline time.
NOTE- Late Messages that are timestamped prior to consumer's select time or last reset time will not be delivered even with this flag enabled.
rebroadcastbool - Allow rebroadcast unique message on open/reset cursors.
realTimeNotificationbool - Enabled/Disables sending system messages when cursor switches from historical to realtime mode.
minLatencybool - try to receive messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.
decimalTypeDecimalType enum - specifies output type for decimal fields.
withSpaces
def withSpaces(spaces: 'list[str]') -> None
List of spaces to select data from. If set to None then data from all spaces is loaded.
LoadingOptions
class LoadingOptions(object)
Options for loading data into a stream.
Example:
```
lo = dxapi.LoadingOptions()
lo.writeMode = dxapi.WriteMode('TRUNCATE')
lo.space = 'myspace'
lo.minLatency = False
```Properties:
writeModeWriteMode - see WriteMode class description.
minLatencybool - try to send messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.
spacestr - Data Partition. Contains unique number of instruments or Time Ranges.
StreamOptions
class StreamOptions(object)
Stream definition attributes.
Example:
```
so = dxapi.StreamOptions()
so.name('Test Name')
so.description('Test Description')
so.owner('Test Owner')
so.metadata(schema)
so.scope = dxapi.StreamScope('TRANSIENT')
so.distributionFactor = 1
so.polymorphic = True
so.periodicity = 'STATIC'
so.unique = True
so.duplicatesAllowed = False
so.version = '4.3'
db.createStream(key, so)
```Properties:
scopeStreamScope - Determines persistent properties of a stream.
distributionFactorint - The number of M-files into which to distribute the data. Supply MAX_DISTRIBUTION to keep a separate file for each instrument (default).
duplicatesAllowedbool - Indicates that loader will ignore binary similar messages(for 'unique' streams only).
highAvailabilitybool - High availability durable streams are cached on startup.
uniquebool - Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.polymorphic (bool).
periodicitybool - Stream periodicity, if known.
versionstr - Stream format version. Supported versions are: '5.0' - Editable past format, TS data file; '4.3' - 'Classic' storage format, allocation free.
name
def name(name: str = None) -> str
Optional user-readable name.
description
def description(description: str = None) -> str
Optional multi-line description.
owner
def owner(owner: str = None) -> str
Optional owner of stream. During stream creation it will be set equals to authenticated user name.
location
def location(location: str = None) -> str
Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)
distributionRuleName
def distributionRuleName(distributionRuleName: str = None) -> str
Class name of the distribution rule
metadata
def metadata(metadata: 'SchemaDef | str' = None) -> 'SchemaDef'
Stream metadata (SchemaDef or json string) as dxapi.SchemaDef object.
schema
def schema(metadata: 'SchemaDef | str' = None) -> 'SchemaDef'
Stream metadata (SchemaDef or json string) as dxapi.SchemaDef object. Alias for metadata().
metadataXml
def metadataXml(metadataXml: str = None) -> str
Stream metadata in XML format.
QueryParameter
class QueryParameter(object)
Input parameter definition for a prepared statement.
LockType
class LockType(object)
Represents lock type.
Object of this class can be created from instrument type name (str), or int value:
Example:
```
readLockType = dxapi.LockType('READ')
writeLockType = dxapi.LockType('WRITE')
writeLockType = dxapi.LockType(dxapi.LockType.READ_LOCK)
```Possible values:
```
READ,
WRITE
```
LockOptions
class LockOptions(object)
Options for stream locking.
Example:
```
lo = dxapi.LockOptions()
lo.type = dxapi.LockType('WRITE')
lo.startTime = 0
lo.endTime = 1000
```Properties:
typeLockType - type of lock;
startTimeint - Start time for ranged write locks (ms);
endTimeint - End time for ranged write locks (ms).
ExecutionStatus
class ExecutionStatus(object)
Background process status.
Object of this class can be created from instrument type name (str), or int value:
Possible values:
```
NONE,
RUNNING,
COMPLETED,
ABORTED,
FAILED
```
BackgroundProcessInfo
class BackgroundProcessInfo(object)
Background process info.
Example:
```
stream.changeSchema(schema, None, defaults, True)
while str(stream.backgroundProcessInfo().status) != 'Completed':
print('Waiting task to finish...')
time.sleep(1)
```Properties:
statusExecutionStatus - status of process;
progressdouble - progress of process (from 0.0 to 1.0);
startTimeint - task start time in millis;
endTimeint - task start time in millis.
TickDb
class TickDb(object)
The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl:
db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011')or
db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')
createFromUrl
@staticmethod
def createFromUrl(url: str,
user: str = None,
password: str = None) -> "TickDb"
Creates a new database instance with the specified root folder, or URL.
Arguments:
urlstr - Connection URL.userstr - User.passwordstr - Password.Returns:
TickDb- An un-opened TickDB instance.
openFromUrl
@staticmethod
@contextmanager
def openFromUrl(url: str,
readonly: bool,
user: str = None,
password: str = None)
Creates a new database instance with the specified root folder, or URL, and opens it.
Arguments:
urlstr - Connection URL.readonlybool - Open data store in read-only mode.userstr - User.passwordstr - Password.Returns:
TickDb- An opened TickDB instance.
setApplicationName
def setApplicationName(name: str) -> None
Set custom visible name of application.
Arguments:
namestr - Application name.
isReadOnly
def isReadOnly() -> bool
Determines whether the store is open as read-only.
isOpen
def isOpen() -> bool
Determines whether the store is open.
open
def open(readOnlyMode: bool) -> bool
Open the data store.
Arguments:
readOnlyModebool - Open data store in read-only mode.
close
def close() -> None
Closes data store.
format
def format() -> bool
Create a new object on disk and format internally. The data store is left open for read-write at the end of this method.
generateSchema
def generateSchema(types: 'list[str]') -> str
Generates SchemaDef object from specified content types.
schema = db.generateSchema(['deltix.timebase.api.messages.TradeMessage',
'deltix.timebase.api.messages.BestBidOfferMessage'])
listStreams
def listStreams() -> 'list[TickStream]'
Enumerates existing streams.
Returns:
list[TickStream]- An array of existing stream objects.
getStream
def getStream(key: str) -> 'TickStream'
Looks up an existing stream by key.
Arguments:
keystr - Identifies the stream.Returns:
TickStream- A stream object, or None if the key was not found.
createStream
def createStream(key: str, options: StreamOptions) -> 'TickStream'
Creates a new stream within the database.
Arguments:
keystr - A required key later used to identify the stream.optionsStreamOptions - Options for creating the stream.Returns:
TickStream- A new instance of TickStream.
createFileStream
def createFileStream(key: str, dataFile: str) -> 'TickStream'
Creates a new stream mount to the given data file.
Arguments:
keystr - A required key later used to identify the stream.dataFilestr - Path to the data file (on server side).Returns:
TickStream- A new instance of TickStream.
createCursor
def createCursor(stream: 'TickStream',
options: SelectionOptions) -> 'TickCursor'
Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
streamTickStream - Stream from which data will be selected.optionsSelectionOptions - Selection options.Returns:
TickCursor- A cursor used to read messages.
tryCursor
@contextmanager
def tryCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'
contextmanager version of createCursor. Usage:
with db.tryCursor(stream, options) as cursor:
while cursor.next():
message = cursor.getMessage()
select
def select(timestamp: int, streams: 'list[TickStream]',
options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> 'TickCursor'
Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestampint - The start timestamp in millis.streamslist[TickStream] - Streams from which data will be selected.optionsSelectionOptions - Selection options.typeslist[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entitieslist[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor- A cursor used to read messages.
trySelect
@contextmanager
def trySelect(
timestamp: int, streams: 'list[TickStream]', options: SelectionOptions,
types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> 'TickCursor'
Contextmanager version of select. Usage:
with db.trySelect(timestamp, streams, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
createLoader
def createLoader(stream: 'TickStream',
options: LoadingOptions) -> 'TickLoader'
Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
streamTickStream - stream for loading data.optionsLoadingOptions - Loading Options.Returns:
TickLoader- created loader.
tryLoader
@contextmanager
def tryLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'
Contextmanager version of createLoader. Usage:
with db.tryLoader(stream, options) as loader: loader.send(message)
executeQuery
def executeQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity] | list[str]' = None,
params: 'list[QueryParameter]' = None) -> 'TickCursor'
Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Arguments:
querystr - Query text element.optionsSelectionOptions - Selection options.timestampint - The start timestamp in millis.entitieslist[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.paramslist[QueryParameter] - The parameter values of the query.Returns:
TickCursor- An iterable message source to read messages.
tryExecuteQuery
@contextmanager
def tryExecuteQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity] | list[str]' = None,
params: 'list[QueryParameter]' = None) -> 'TickCursor'
Contextmanager version of executeQuery. Usage:
with db.tryExecuteQuery('select * from stream') as cursor:
while cursor.next():
message = cursor.getMessage()
createMultiplexedCursor
def createMultiplexedCursor(subscription: MultiplexerSubscription,
timestamp: int = None,
options: SelectionOptions = None) -> 'TickCursor'
Creates and returns a multiplexed TickCursor for the given subscription.
Arguments:
subscription: MultiplexerSubscription object defining feedstimestamp: Epoch time in milliseconds for the subscription (default: JAVA_LONG_MIN_VALUE)options: SelectionOptions object for cursor configuration (default is an empty options object)Raises:
ValueError: If subscription is None or contains no feedsTypeError: If subscription is not a MultiplexerSubscription objectReturns:
TickCursor object for reading multiplexed cursor
tryMultiplexedCursor
@contextmanager
def tryMultiplexedCursor(subscription: MultiplexerSubscription,
timestamp: int = None,
options: SelectionOptions = None) -> 'TickCursor'
Contextmanager version of createMultiplexedCursor. Usage:
with db.tryMultiplexedCursor(subscription) as cursor:
while cursor.next():
message = cursor.getMessage()
TickStream
class TickStream(object)
The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams.
Get stream:
```
stream = tickdb.getStream('stream_key')
```List stream:
```
streams = tickdb.listStreams()
```
key
def key() -> str
Returns the key, which uniquely identifies the stream within its database.
name
def name() -> str
Returns a user-readable short name.
distributionFactor
def distributionFactor() -> int
Returns the target number of files to be used for storing data.
description
def description() -> str
Returns a user-readable multi-line description.
owner
def owner() -> str
Returns stream owner.
location
def location() -> str
Returns stream location.
metadata
def metadata() -> str
Returns stream schema as dxapi.SchemaDef object.
schema
def schema() -> str
Returns stream schema as dxapi.SchemaDef object. (Alias for metadata)
metadataXml
def metadataXml() -> str
Returns stream schema (in xml format).
scope
def scope() -> StreamScope
Returns stream schema (in xml format).
version
def version() -> str
Returns stream data format version.
highAvailability
def highAvailability() -> bool
Returns stream memory caching parameter. High availability durable streams are cached on startup.
unique
def unique() -> bool
Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.
polymorphic
def polymorphic() -> bool
Returns whether the stream is configured as polymorphic.
periodicity
def periodicity() -> str
Returns Stream periodicity, if known.
options
def options() -> StreamOptions
Returns stream options object.
describe
def describe() -> str
Returns stream DDL description.
changeSchema
def changeSchema(schema: 'SchemaDef | str',
mappings: 'dict' = None,
defaults: 'dict' = None,
background: bool = False) -> bool
Run schema change task for stream.
Arguments:
schema (SchemaDef or str(json)): New stream schema;
mappingsdict - column mappings in format 'TypeName:FiledName', for example: { 'deltix.timebase.api.messages.TradeMessage:size' : 'deltix.timebase.api.messages.TradeMessage:newSizeField' };
defaultsdict - default values for not nullable columns. For example, { 'deltix.timebase.api.messages.TradeMessage:newSizeField' : '1.23' };
backgroundbool - true to run task in background mode, in this case method returns right after rest query to server. You can track status of background task with backgroundProcessInfo() method. You can abort task with abortBackgroundProcess() method.
backgroundProcessInfo
def backgroundProcessInfo() -> 'BackgroundProcessInfo'
Gets stream background process information.
abortBackgroundProcess
def abortBackgroundProcess() -> None
Aborts active background process if any exists
listEntities
def listEntities(
instrumentTypes: 'list[InstrumentType]' = None
) -> 'list[InstrumentIdentity]'
Return list of instruments in stream.
Arguments:
instrumentTypeslist[InstrumentType] - instrument types for which entities should be selected. If None, all entities of stream will be selected.Returns:
list[InstrumentIdentity]- stream instruments.
listSymbols
def listSymbols(instrumentTypes: 'list[InstrumentType]' = None) -> 'list[str]'
Return list of symbols in stream.
Arguments:
instrumentTypeslist[str] - instrument types for which entities should be selected. If None, all entities of stream will be selected.Returns:
list[InstrumentIdentity]- stream symbols.
truncate
def truncate(timestamp: int,
entities: 'list[InstrumentIdentity] | list[str]' = None) -> bool
Truncates stream data for the given entities from given time
Arguments:
timestampint - Timestamp in millis. If time less than stream start time, then all stream data will be deleted.entitieslist[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.Returns:
bool- true, if stream was truncated successfully.
clear
def clear(entities: 'list[InstrumentIdentity] | list[str]' = None) -> bool
Clear stream data for the given entities.
Arguments:
entitieslist[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.
purge
def purge(timestamp: int) -> bool
Deletes stream data that is older than a specified time
Arguments:
timestamp (int):Purge time in milliseconds.
Returns:
bool- true, if stream was purged successfully.
deleteData
def deleteData(
fromMs: int,
toMs: int,
entities: 'list[InstrumentIdentity] | list[str]' = None) -> bool
Deletes stream data for the given entities using specified time range.
Arguments:
fromint - start timestamp (inclusive). Time is measured in milliseconds that passed since January 1, 1970 UTC.toint - end timestamp (inclusive). Time is measured in milliseconds that passed since January 1, 1970 UTC. If time more than stream end time, then all stream data will be deleted for the given stream.entitieslist[InstrumentIdentity] or list[str] - A list of entities. If None, all stream entities will be used.Returns:
bool- true, if stream was truncated successfully.
deleteStream
def deleteStream() -> bool
Deletes this stream
Returns:
bool- true, if stream was deleted successfully.
abortBackgroundProcess
def abortBackgroundProcess() -> bool
Aborts active background process if any exists
lock
def lock(options: 'LockType | LockOptions' = None) -> None
Acquire a lock of this stream. Default lock type is WRITE.
tryLock
def tryLock(options: 'LockType | LockOptions', timeout: 'int') -> None
Blocking operation that attempts to obtain given type of lock on this stream. If lock cannot be obtained during specified timeout operation fails with exception.
unlock
def unlock() -> None
Releases stream lock.
select
def select(timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> 'TickCursor'
Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestampint - The start timestamp in millis.optionsSelectionOptions - Selection options.typeslist[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entitieslist[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor- A cursor used to read messages.
trySelect
@contextmanager
def trySelect(
timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> 'TickCursor'
Contextmanager version of select. Usage:
with stream.trySelect(timestamp, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
createCursor
def createCursor(options: SelectionOptions) -> 'TickCursor'
Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options)
Arguments:
optionsSelectionOptions - Selection Options.Returns:
A cursor used to read messages. Never null.
tryCursor
@contextmanager
def tryCursor(options: SelectionOptions) -> 'TickCursor'
contextmanager version of createCursor. Usage:
with stream.tryCursor(options) as cursor:
while cursor.next():
message = cursor.getMessage()
createLoader
def createLoader(options: LoadingOptions) -> 'TickLoader'
Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
optionsLoadingOptions - Loading Options.Returns:
TickLoader- created loader.
tryLoader
@contextmanager
def tryLoader(options: LoadingOptions) -> 'TickLoader'
Contextmanager version of createLoader. Usage:
with stream.tryLoader(options) as loader:
loader.send(message)
renameInstruments
def renameInstruments(source: 'list[InstrumentIdentity]',
target: 'list[InstrumentIdentity]') -> None
Renames list of instruments.
Arguments:
sourcelist[InstrumentIdentity] - source entities list;targetlist[InstrumentIdentity] - target entities list.Example:
stream.renameInstruments([dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'GOOG'),
dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'IBM')],
[dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'GOOG_NEW'),
dxapi.InstrumentIdentity(dxapi.InstrumentType.FX, 'IBM_NEW')])
listSpaces
def listSpaces() -> 'list[str]'
Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned.
renameSpace
def renameSpace(newName: str, oldName: str) -> None
Rename existing space.
Arguments:
nameNamestr - space to rename.oldNamestr - new space name.
deleteSpaces
def deleteSpaces(spaces: 'list[str]') -> None
Removed given 'spaces' permanently.
Arguments:
spaceslist[str] - list of spaces names to delete.
getTimeRange
def getTimeRange(
entities: 'list[InstrumentIdentity] | list[str]' = None
) -> 'list[int]'
Return an inclusive range of times for which the specified entities have data in the database.
Arguments:
entitieslist[InstrumentIdentity] - A list of entities or list of symbols. If empty, return for all.
getSpaceTimeRange
def getSpaceTimeRange(space: str) -> 'list[int]'
An array consisting of two long timestamps (from and to) or None if no data was found.
Arguments:
spacestr - space name.
TickCursor
class TickCursor(object)
A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp.
To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set.
Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset:
```
options = dxapi.SelectionOptions()
cursor = tickdb.createCursor(stream, options)
cursor.subscribeToAllEntities()
cursor.subscribeToAllTypes()
cursor.reset(timestamp)
```
next
def next() -> bool
Moves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true.
Returns:
bool- false if at the end of the cursor.
getMessage
def getMessage() -> 'InstrumentMessage'
Returns an InstrumentMessage object cursor points at.
isAtEnd
def isAtEnd() -> bool
Returns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle.
nextIfAvailable
def nextIfAvailable() -> int
Moves cursor on to the next message, but this method NOT blocks until the next message becomes available.
Returns:
NextResult- OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2)
isClosed
def isClosed() -> bool
Returns true, if cursor was closed
close
def close() -> None
Close the cursor
getCurrentStreamKey
def getCurrentStreamKey() -> str
Return the key of the stream that is the source of the current message.
reset
def reset(timestamp: int,
entities: 'list[InstrumentIdentity] | list[str]' = None) -> None
Reposition the message source to a new point in time, while preserving current subscription.
Arguments:
timestampint - The new position in time in millis.entities'list[InstrumentIdentity]' - list of entities to reset
subscribeToAllEntities
def subscribeToAllEntities() -> None
Subscribe to all available entities.
clearAllEntities
def clearAllEntities() -> None
Switch to selective subscription mode (if necessary) and clear the list.
addEntity
def addEntity(entity: InstrumentIdentity) -> None
Add the specified entity to subscription.
Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, 'DAV 100417P00085000'));
While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, "DAV "));
addSymbol
def addSymbol(symbol: str) -> None
Add the specified symbol to subscription.
addEntities
def addEntities(entities: 'list[InstrumentIdentity]') -> None
Bulk add the specified entities to subscription.
addSymbols
def addSymbols(symbols: 'list[str]') -> None
Bulk add the specified symbols to subscription.
removeEntities
def removeEntities(entities: 'list[InstrumentIdentity]') -> None
Remove the specified entities from subscription.
removeSymbols
def removeSymbols(symbols: 'list[str]') -> None
Remove the specified symbols from subscription.
removeEntity
def removeEntity(entity: InstrumentIdentity) -> None
Remove the specified entity from subscription.
removeSymbol
def removeSymbol(symbol: str) -> None
Remove the specified symbol from subscription.
subscribeToAllTypes
def subscribeToAllTypes() -> None
Subscribe to all available types (no filtering).
addTypes
def addTypes(types: 'list[str]') -> None
Add the specified type names to subscription.
removeTypes
def removeTypes(types: 'list[str]') -> None
Remove the specified types from subscription.
setTypes
def setTypes(types: 'list[str]') -> None
Subscribe to specified types.
add
def add(types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> None
Add the specified entities and types to subscription.
Arguments:
typeslist[str] - not-null array of type names to subscribe.entitieslist[InstrumentIdentity] - not-null array of instruments to subscribe.
remove
def remove(types: 'list[str]',
entities: 'list[InstrumentIdentity] | list[str]') -> None
Remove the specified entities and types from subscription.
Arguments:
typeslist[str] - not-null array of type names to unsubscribe.entitieslist[InstrumentIdentity] - not-null array of instruments to unsubscribe.
addStreams
def addStreams(streams: 'list[TickStream]') -> None
Add streams to subscription. Current time and filter is used to query data from new sources.
Arguments:
streams'list[TickStream]' - Streams to add.
removeStreams
def removeStreams(streams: 'list[TickStream]') -> None
Remove streams from subscription.
Arguments:
streamslist[TickStream] - Streams to remove.
removeAllStreams
def removeAllStreams() -> None
Remove all streams from subscription.
setTimeForNewSubscriptions
def setTimeForNewSubscriptions(timestamp: int) -> None
This method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time.
Arguments:
timestampint - The time to use.
TickLoader
class TickLoader(object)
Object which consumes messages.
Create loader from TickDb: options = dxapi.LoadingOptions() stream = tickdb.createLoader(stream, options)
Create loader from TickStream: options = dxapi.LoadingOptions() stream = stream.createLoader(options)
send
def send(message: InstrumentMessage) -> None
This method is invoked to send a message to the object.
Arguments:
messageInstrumentMessage - A temporary buffer with the message. By convention, the message is only valid for the duration of this call.
flush
def flush() -> None
Flushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.
isClosed
def isClosed() -> bool
Returns true, if cursor was closed
close
def close() -> None
Flushes and closes the loader
addListener
def addListener(listener: 'ErrorListener') -> None
Register error listener. All writing data errors will be delivered to the listener. Note, that instance of listener should be stored in variable and sholdn't be destroyed until you close the loader.
Arguments:
listenerErrorListener - error listener to register.
removeListener
def removeListener(listener: 'ErrorListener') -> None
Unsubscribe registered error listener.
Arguments:
listenerErrorListener - error listener to unsubscribe.
nErrorListeners
def nErrorListeners() -> int
Returns number of registered error listeners
registerType
def registerType(type: str) -> int
Register type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example:
message = dxapi.InstrumentMessage()
message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader")
// as alternative, you could write:
// message.typeName = "deltix.timebase.api.messages.universal.PackageHeader"
loader.send(message)Arguments:
typestr - name of type to register.Returns:
int- id of registered type.
registerInstrument
def registerInstrument(type: InstrumentType, symbol: str) -> int
Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:
message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrument(dxapi.InstrumentType.EQUITY, 'AAPL')
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)Arguments:
typeInstrumentType - type of instrument.symbolstr - instrument ticker.Returns:
int- id of registered instrument.
registerInstrumentIdentity
def registerInstrumentIdentity(instrument: InstrumentIdentity) -> int
Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:
message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrumentIdentity(dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL'))
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)Arguments:
typeInstrumentType - type of instrument.symbolstr - instrument ticker.Returns:
int- id of registered instrument.
ErrorListener
class ErrorListener(object)
Listener for errors related to loading/sending data into a TickLoader.
Usage:
```
class MyErrorListener(dxapi.ErrorListener):
def onError(self, errorClass, errorMsg):
print('Error: ' + errorMsg)
listener = MyErrorListener() # save listener
loader.addListener(listener)
loader.send()
...
loader.removeListener(listener)
loader.close()
```
SchemaDef
class SchemaDef()
TimeBase stream schema definition class.
Example:
```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
```Properties:
alllist[TypeDef] - all types of stream schema (including content, non-content classes and enums).
typeslist[TypeDef] - content (top level) types of stream.
TypeDef
class TypeDef()
TimeBase stream type definition class.
Example:
```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
```Properties:
namestr - name of type.
titlestr - title of type.
fieldslist[FieldDef] - list of type fields.
isEnumbool - true, if type is enum.
parentstr - name of parent type.
isAbstractbool - true, if type if abstract.
FieldDef
class FieldDef()
TimeBase type field definition class.
Example:
```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')
```Properties:
namestr - name of field.
titlestr - title of field.
descriptionstr - description of field.
typeDataTypeDef - type of field.
staticbool - true, if field is static.
valuestr - static value of field.
relativeTostr - field name (used to specify that field decoding depends on another field value).
primaryKeybool - true, if field is a part of the message's primary key.
hidebool - true, if field is hidden.
DataTypeDef
class DataTypeDef()
TimeBase field type definition class.
Example:
```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')
priceField.type.encoding = 'IEEE64'
```Properties:
namestr - name of field type.
encodingstr - encoding of field type.
nullablebool - true for nullable field types.
typeslist[str] - list of schema type names for polymorphic types (for objects).
elementTypestr - type name of array element data type (for arrays).
MultiplexerFeed
class MultiplexerFeed()
__init__
def __init__(streams: 'list[TickStream]' = None,
timestamp: int = None,
types: 'list[str]' = None,
entities: 'list[InstrumentIdentity] | list[str]' = None,
qql: str = None,
qql_params: 'list[QueryParameter]' = None)
Initialize a MultiplexerFeed.
timestamp (int): The start timestamp in millis.
streams (list[TickStream]): Streams from which data will be selected.
types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed.
entities (list[InstrumentIdentity | str]): Specified entities to be subscribed. If null, then all entities will be subscribed.
qql: Query string
qql_params (list[QueryParameter]): The parameter values of the query.
to_dict
def to_dict()
Convert the MultiplexerFeed to a dictionary.
MultiplexerSubscription
class MultiplexerSubscription()
__init__
def __init__()
Initialize the subscription with an empty list of feeds.
subscribe_stream
def subscribe_stream(stream: 'TickStream',
timestamp: int = None,
types: 'list[str]' = None,
entities: 'list[InstrumentIdentity | str]' = None)
Create a new feed using specified criteria and add it to the subscription.
timestamp (int): The start timestamp in millis.
stream (TickStream): Stream from which data will be selected.
types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed.
entities (list[InstrumentIdentity] | list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed.
subscribe_streams
def subscribe_streams(streams: 'list[TickStream]',
timestamp: int = None,
types: 'list[str]' = None,
entities: 'list[InstrumentIdentity | str]' = None)
Create a new feed using specified criteria and add it to the subscription.
timestamp (int): The start timestamp in millis.
streams (list[TickStream]): Streams from which data will be selected.
types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed.
entities (list[InstrumentIdentity] | list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed.
subscribe_qql
def subscribe_qql(qql: str,
timestamp: int = None,
qql_params: 'list[QueryParameter]' = None)
Create a new feed using a query (qql) and add it to the subscription.
Arguments:
qql: Query stringtimestamp: The start timestamp in millis.
add_feed
def add_feed(feed: MultiplexerFeed)
Add a feed to the subscription.
add_feeds
def add_feeds(feeds: 'list[MultiplexerFeed]' = [])
Add feeds to the subscription.
Arguments:
feeds: List of MultiplexerFeed objects to add.
to_dict
def to_dict()
Convert the subscription to a dictionary.